-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use base classes for AWS Lambda Operators/Sensors #34890
Conversation
c11c489
to
7db910a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one!
template_fields: Sequence[str] = ( | ||
"function_name", | ||
"runtime", | ||
"role", | ||
"handler", | ||
"code", | ||
"config", | ||
*AwsBaseOperator.template_fields, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than adding this to every operator class, can we do it the other way around where the base class has this field already with these defaults and we update it with anything new that the concrete class wants to add (if anything)? It would need to be a mutable type of course, I'm not sure if it being immutable is a requirement or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could update it only in init method, at least I don't know another way to change class attribute during creation, and I'm not sure is it works on each cases or not (especially for mapped tasks).
Another way it is make parameters as set, but this have side effect, set doesn't preserve the order, and it might generate new serialised DAG each time it parsed (maybe it is not so bad)
class Base:
templates_fields: set[str] = {"a", "b", "c"}
class Child1(Base):
...
class Child2(Base):
templates_fields = Base.templates_fields | {"d", "e"}
class Child3(Base):
...
print(" Step 1 ".center(72, "="))
print(f"Base.templates_fields={Base.templates_fields}")
print(f"Child1.templates_fields={Child1.templates_fields}")
print(f"Child2.templates_fields={Child2.templates_fields} - the order would change from run to run")
print(f"Child3.templates_fields={Child3.templates_fields}")
print(" Step 2: Update one of the child ".center(72, "="))
Child3.templates_fields.update({"f", "g"})
print(f"Base.templates_fields={Base.templates_fields}")
print(f"Child1.templates_fields={Child1.templates_fields}")
print(f"Child2.templates_fields={Child2.templates_fields} - the order would change from run to run")
print(f"Child3.templates_fields={Child3.templates_fields}")
print(" Step 3: Invalid operation ".center(72, "="))
class Child5(Base):
templates_fields.add("h") # We can't do that
================================ Step 1 ================================
Base.templates_fields={'b', 'c', 'a'}
Child1.templates_fields={'b', 'c', 'a'}
Child2.templates_fields={'e', 'b', 'd', 'c', 'a'} - the order would change from run to run
Child3.templates_fields={'b', 'c', 'a'}
=================== Step 2: Update one of the child ====================
Base.templates_fields={'b', 'f', 'g', 'c', 'a'}
Child1.templates_fields={'b', 'f', 'g', 'c', 'a'}
Child2.templates_fields={'e', 'b', 'd', 'c', 'a'} - the order would change from run to run
Child3.templates_fields={'b', 'f', 'g', 'c', 'a'}
====================== Step 3: Invalid operation =======================
Traceback (most recent call last):
...
NameError: name 'templates_fields' is not defined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought about another option, create helper function for apply default parameters
from airflow.compat.functools import cache
@cache
def aws_template_fields(*templated_fields: str) -> tuple[str]:
return tuple(sorted(list({"aws_conn_id", "region_name", "verify"} | set(templated_fields))))
class SomeOperator:
template_fields: Sequence[str] = aws_template_fields(
"function_name",
"runtime",
"role",
"handler",
"code",
"config",
)
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A variation on that might be to have each operator define "op_template_fields" and have BaseOperator's template_fields
return base+op? Basically:
Class BaseOperator:
base_template_fields = ("aws_conn_id", "region_name", "verify")
@cache
def template_fields:
return self.base_template_fields + self.operator_template_fields
Class MyOperator:
op_template_fields = ("foo", ''"bar")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also would be nice to test within the mapped task, otherwise we could bump into the same problem as we have in BatchOperator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought about another option, create helper function for apply default parameters
This is certainly nicer! Less duplication and the defaults are complexity hidden from the users. It would be nice to have a fully backwards compatible approach, but I'm happy to settle on this one.
A variation on that might be to have each operator define "op_template_fields"
This would make the existing operators incompatible since the field name would need to change, but we need to make other changes to them to convert them to the base class approach, so maybe that's perfectly fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this helper 28aef3b
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
docs/apache-airflow-providers-amazon/_partials/generic_parameters.rst
Outdated
Show resolved
Hide resolved
Co-authored-by: Niko Oliveira <[email protected]>
@@ -152,7 +150,7 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None | |||
return event["function_arn"] | |||
|
|||
|
|||
class LambdaInvokeFunctionOperator(BaseOperator): | |||
class LambdaInvokeFunctionOperator(AwsBaseOperator[LambdaHook]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll admit this format for the inheritence is new to me and I am trying to read up on how it's doing what it does, but it appears that the value inside the square brackets here is always the same as the value being assigned to aws_hook_class
within the Operator. I don't suppose there is a way to simplify that so you don't need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In square brakets it is just typing.Generic, which help type annotate type of the hook
property, all chain is
- Define TypeVar
AwsHookType = TypeVar("AwsHookType", bound=AwsGenericHook) |
- Define Generic in class definition
airflow/airflow/providers/amazon/aws/utils/mixins.py
Lines 100 to 102 in e9987d5
class AwsBaseHookMixin(Generic[AwsHookType]): | |
"""Mixin class for AWS Operators, Sensors, etc. |
- Use defined typevar as annotation
airflow/airflow/providers/amazon/aws/utils/mixins.py
Lines 111 to 112 in e9987d5
# Should be assigned in child class | |
aws_hook_class: type[AwsHookType] |
airflow/airflow/providers/amazon/aws/utils/mixins.py
Lines 148 to 151 in e9987d5
@cached_property | |
@final | |
def hook(self) -> AwsHookType: | |
""" |
In general class LambdaInvokeFunctionOperator(AwsBaseOperator[LambdaHook]):
for help IDE, static checkers to understand that hook
will return LambdaHook
object (in our case we use only in one place), and in additional validate that aws_hook_class
should be a subclass of LambdaHook
However information about generics doesn't available in runtime and we can't extract it from class definition, so we should assign actual class to aws_hook_class
for construct hook in hook
property
@@ -0,0 +1,68 @@ | |||
.. Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice addition
closes: #34781
Doc render samples
Generic Parameters
Configure
botocore.config.Config
Temporary include changes from Extend hooks arguments intoAwsBaseWaiterTrigger
#34884^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.